package com.parkingwang.learning.observertype;

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.schedulers.Schedulers;

import java.util.concurrent.TimeUnit;


public class ColdHotObservable {

    public static void main(String[] args) {

        Consumer<Long> subscirbe1 = new Consumer<Long>() {
            @Override
            public void accept(Long aLong) {
                System.out.println("s1=" + aLong);
            }
        };


        Consumer<Long> subscribe2 = new Consumer<Long>() {
            @Override
            public void accept(Long aLong) {
                System.out.println("s2=" + aLong);
            }
        };

        Observable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
            @Override
            public void subscribe(ObservableEmitter<Long> emitter) throws Exception {
                Observable.interval(10, TimeUnit.MILLISECONDS)
                        .take(Integer.MAX_VALUE)
                        .subscribe(emitter::onNext);
            }
        }).observeOn(Schedulers.newThread());

        coldObservable(subscirbe1, subscribe2, observable);


//        hotObservable(subscirbe1, subscribe2, observable);


        //(refCount操作符)  hot
//        refCountOperators(subscirbe1, subscribe2, observable);

    }

    /**
     * 全部取消订阅,数据流重新开始
     * 局部取消订阅，数据流保从原来位置继续
     * @param subscirbe1
     * @param subscribe2
     * @param observable
     */
    private static void refCountOperators(Consumer<Long> subscirbe1, Consumer<Long> subscribe2, Observable<Long> observable) {

        Observable<Long> longObservable = observable.share();

//        ConnectableObservable<Long> publish = observable.publish();
//
//        publish.connect();
//
//        Observable<Long> longObservable = publish.refCount();


        Disposable disposable1 = subscibeSleep(subscirbe1, longObservable);
        Disposable disposable2 = subscibeSleep(subscribe2, longObservable);

        disposable1.dispose();
        disposable2.dispose();

        System.out.println("subscirbe1,subscirbe2取消订阅后重新订阅");
        subscibeSleep(subscirbe1, longObservable);
        subscibeSleep(subscribe2, longObservable);
    }

    /**
     * 线程安全，存在共享变量,不支持背压
     *
     * @param subscirbe1
     * @param subscribe2
     * @param observableL
     */
    private static void hotObservable(Consumer<Long> subscirbe1, Consumer<Long> subscribe2, Observable<Long> observableL) {


        ConnectableObservable<Long> observable = observableL.publish();

        //notice
        observable.connect();

        subscibeSleep(subscirbe1, observable);
        subscibeSleep(subscribe2, observable);
    }

    /**
     * 非线程安全，不存在共享变量
     *
     * @param subscirbe1
     * @param subscribe2
     * @param observable
     */
    private static void coldObservable(Consumer<Long> subscirbe1, Consumer<Long> subscribe2, Observable<Long> observable) {

        subscibeSleep(subscirbe1, observable);
        subscibeSleep(subscribe2, observable);
    }


    private static Disposable subscibeSleep(Consumer<Long> subscribe2, Observable<Long> longObservable) {
        Disposable disposable2 = longObservable.subscribe(subscribe2);

        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return disposable2;
    }

}
